Skip to main content

Optimizing Models with Post-Training Quantization in Keras - Part I

Performing Facial Keypoints Detection with Post-Training Quantization in Keras
Created on January 11|Last edited on February 10
Welcome to part one of a short series on quantization in Keras. Today, we’ll be looking at post-training quantization. Let’s get started!

Introduction

Developing and training machine learning models is one thing, but deploying those models in the most optimized way possible is another all together. And while there are many different ways to think about what “optimization” means, for our purposes today, we want to define that as reducing the size of your model while still maintaining your accuracy.
To understand a bit more of what we mean, let’s start with an example. Say that Ross is a machine learning engineer on a smartwatch project. He’s built a great model from data like heart beat, steps taken, calories burned, etc. with a simple goal of improving the health of his users and predicting things like heart disease.
Now, when he was developing this model, Ross gathered a dataset with millions of rows and trained a hefty model—5 GB with 95% accuracy. Sounds great! Ross is happy.
Not for long though. The problem is that Ross was using a large system with multiple GPUs and essentially unlimited storage. The smart watch he wants to deploy his model on? It can store just 1 GB.
This is a model that needs to be optimized.
This is actually quite a common problem, especially for edge devices. We want to reduce the size of the model without appreciably sacrificing our accuracy. We do in fact have techniques to do this, ways for Ross to shrink his model to something like 800MB but only lose a point of two of accuracy.
One way to do this? Quantization. In this post, we’re going to focus on what’s called post-training quantization (there’s another post coming next week about quantization-aware training). We’ll explain the important concepts you need to know, implement these techniques, and see in real time how this affects our model and its accuracy.
Today, we’ll be doing this with a typical facial recognition dataset. Let's dig in:

Our Data and Our Goal

We’ll be using this facial keypoints dataset from Kaggle for our tutorial and experiments today. The goal of that competition, simply, was to detect keypoints on pictures of people’s faces.
Essentially, a keypoint helps a model understand and orient a person. They are, well, key points for a model, generally things like the tip of the nose, the center of the eye, or the corner of a mouth. They are not the center of the cheek, a forehead, etc. Keypoints have x and y pixel coordinates for pixels for that location. In this dataset we are given the x and y coordinates for 15 such keypoints for grayscale images so we have a total of 30 inputs for the model per picture. The input image is given in the last field of the data files, and consists of a list of pixels (ordered by row), as integers in (0,255). The images are 96x96 pixels.
Again, our goal here is pretty basic: to detect the locations of the keypoints and to use quantization to reduce the compute size of our model.

Preliminaries

We start by importing the necessary libraries. We will see how we use them as we move ahead.
!pip install wandb

import os
import time
import pathlib

import pandas as pd
import numpy as np

# Progress Bar
from tqdm.notebook import tqdm

# Data Visualization
import matplotlib.pyplot as plt
%matplotlib inline

# Sklearn
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

# Tensorflow
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.applications.resnet50 import ResNet50
# import tensorflow_model_optimization as tfmot

# wandb
import wandb
from wandb.keras import WandbCallback

wandb.login()

Global Configuration

Its always a nice practice to have a separate configuration file or a class which incorporates the parameters as shown below. We have four files in the dataset -
  1. Training File - List of training 7049 images. Each row contains the (x,y) coordinates for 15 keypoints, and image data as row-ordered list of pixels.
  2. Test File - List of 1783 test images. Each row contains ImageId and image data as row-ordered list of pixels
  3. Id Lookup Table - The keypoint which is supposed to be predicted
  4. Sample Submission - The final submission file containing the predicted locations of the keypoints.
class config:
DIRECTORY_PATH = "./Facial Keypoints Detection"
TRAIN_FILE_PATH = DIRECTORY_PATH + "/input/training.csv"
TEST_FILE_PATH = DIRECTORY_PATH + "/input/test.csv"
ID_LOOKUP_TABLE_PATH = DIRECTORY_PATH + "/input/IdLookupTable.csv"
SAMPLE_FILE_PATH = DIRECTORY_PATH + "/input/SampleSubmission.csv"
SUBMISSION_FILE_PATH = DIRECTORY_PATH + "/outputs/submission.csv"

BASELINE_CHECKPOINT_PATH = DIRECTORY_PATH + "/models/resnet50_baseline/checkpoint.ckpt"
POST_TRAINING_QUANTIZATION_PATH = DIRECTORY_PATH + "/models/post_training_quantization/resnet50_quant.tflite"

VALIDATION_DATA_SIZE = 0.2

BATCH_SIZE = 32
NUM_EPOCHS = 150
LEARNING_RATE = 0.03

Load Data

Now that we have set our configuration, we load our dataset using the load_data() funcion.
def load_data():

"""
Function to load the train and test dataset
"""

X_train = pd.read_csv(config.TRAIN_FILE_PATH)
X_test_original = pd.read_csv(config.TEST_FILE_PATH)

return X_train, X_test_original

Check Missing Values

As there are many missing values, from inspection of data we can conclude that some images have 15 keypoints whereas others have only 4.
We can take two approaches, either to fill/remove missing values and train one model on the entire dataset or use two models, one for data with 4 keypoints and other for data with 15 keypoints and then combine their predictions.
For simplicity, here we are dropping the missing value samples as we want to train only 1 model on the entire dataset and report changes observed via optimization.
def remove_missing_values():

"""
Function to drop the samples with missing values
"""

X_train, X_test_original = load_data()

X_train = X_train.dropna()
y_train = X_train.drop(['Image'], axis=1)

return X_train, y_train, X_test_original

Obtain Images

The images are represented in terms of space separated pixels in the training data, we need to convert them into the correct format.
def get_images():

"""
Function to get correct images format from the data
"""

X_train, y_train, X_test_original = remove_missing_values()

# Train Data
X_train = X_train.Image
X_train = X_train.apply(lambda x: np.fromstring(x, dtype=int, sep=' ').reshape((96,96)))

# Test Data
X_test_original = X_test_original.Image
X_test_original = X_test_original.apply(lambda x: np.fromstring(x, dtype=int, sep=' ').reshape((96,96)))

X_train /= 255.0
X_test_original /= 255.0

return X_train, y_train, X_test_original
Let us view some images with their keypoints to understand the problem better.
# View Images with their keypoints

X_train, y_train, _ = get_images()

fig = plt.figure(figsize=(9, 9))

for i in range(9):
ax = fig.add_subplot(3, 3, i + 1)
plt.imshow(X_train[i])
for i in range(1, 31, 2):
plt.plot(y_train.loc[0][i-1], y_train.loc[0][i], 'ro')

plt.show()
Training Images with their Keypoints
As you can clearly see, each image has 15 keypoints represented by the dots for different locations of their face. The location (x and y coordinates of pixel values) is what we are trying to predict.

Prepare the Data

We further process the data more to convert the data to numpy arrays, flatten and reshape them to the desired format.
def process_data():

"""
Function to convert, flatten and reshape the arrays
"""

X_train, y_train, X_test_original = get_images()

# Convert to arrays
X_train = X_train.to_numpy()
X_test_original = X_test_original.to_numpy()

# Flatten the arrays
X_train = X_train.flatten()
X_test_original = X_test_original.flatten()

X_train = np.concatenate(X_train, axis=0)
X_train = np.concatenate(X_train, axis=0)

X_test_original = np.concatenate(X_test_original, axis=0)
X_test_original = np.concatenate(X_test_original, axis=0)

# Reshape the arrays to get correct image dimensions
X_train = X_train.reshape((-1, 96, 96))
X_test_original = X_test_original.reshape((-1, 96, 96))

return X_train, y_train, X_test_original
To prepare our validation dataset, we will simply use the train_test_split function offered by scikit-learn.
# Obtain Training and Validation Data Splits

def get_data_split():

"""
Function to obtain train and validation data splits
"""

X_train, y_train, X_test_original = process_data()

X_train, X_valid, y_train, y_valid = train_test_split(
X_train,
y_train,
test_size=config.VALIDATION_DATA_SIZE,
random_state=0
)

X_valid, X_test, y_valid, y_test = train_test_split(
X_valid,
y_valid,
test_size=config.VALIDATION_DATA_SIZE,
random_state=0
)
return X_train, X_valid, X_test, y_train, y_valid, y_test
After we have prepared the training, validation and test datasets, we will make the images as 3 channels (originally they are 1 channel only) to suit our requirements which is achieved using the get_correct_dimensions() function.
def get_correct_dimensions():

"""
Function to convert the data into 3 channels
"""

X_train, X_valid, X_test, y_train, y_valid, y_test = get_data_split()

# Reshape all arrays
X_train = X_train.reshape((-1, 96, 96, 1))
X_valid = X_valid.reshape((-1, 96, 96, 1))
X_test = X_test.reshape((-1, 96, 96, 1))

# Concatenate the data with itself to obtain 3 Channels

X_train = np.concatenate((X_train, X_train, X_train), axis=-1)
X_test = np.concatenate((X_test, X_test, X_test), axis=-1)
X_valid = np.concatenate((X_valid, X_valid, X_valid), axis=-1)

return X_train, X_test, X_valid, y_train, y_valid, y_test

ResNet50 Baseline Model

Till here we have preprocessed our dataset and have obtained the datasets in the correct format. The data is now ready to be trained.
For training the model, we will implement Transfer Learning using a pretrained ResNet50 model. You can try out larger models like EfficientNets too, but for our use case a single ResNet model is enough to achieve a decent score.
Before training a model we need to define certain callbacks. In this case we define 4 callbacks -
  1. TensorBoard to log
  2. Early Stopping to prevent the model from overfitting
  3. Checkpoint Callback to save the model callback
  4. WandB Callback to log everything to our wandb dashboard
def get_callbacks(checkpoint_path, callbacks = tf.keras.callbacks):

"""
Function to declare the callbacks
"""

early_stopping = callbacks.EarlyStopping(
min_delta=0.1, # minimium amount of change to count as an improvement
patience=20, # how many epochs to wait before stopping
restore_best_weights=True,
)

# Create a callback that saves the model's weights
checkpoint_callback = keras.callbacks.ModelCheckpoint(
filepath=checkpoint_path,
save_weights_only=True,
)

# Define the callbacks
callbacks = [
keras.callbacks.TensorBoard(log_dir='./logs'),
early_stopping,
checkpoint_callback,
WandbCallback()
]

return callbacks
Then we define our ResNet model with pretrained weights. However, we will not use all the trainable layers and will add from our side as well.
def get_custom_resnet():

"""
Function to load pretrained resnet model and freeze certain layers
"""

inputs = keras.Input(shape=(96, 96, 3))

# Define the ResNet50 model with pretrained weights (Transfer Learning)
res_model = ResNet50(
input_tensor=inputs,
weights='imagenet'
)

# We will only use certain layers, rest we shall define
for layer in res_model.layers[:143]:
layer.trainable = False

return res_model
Finally we define the entire architecture consisting of 3 simple data augmentations, followed by the ResNet model, a flatten layer and a dense layer.
# Final Architecture with Data Augmentations
def setup_model():

"""
Function to define the final model architecture with data augmentations
"""

res_model = get_custom_resnet()

model = keras.models.Sequential()
model.add(keras.layers.RandomRotation(factor = 0.5))
model.add(keras.layers.RandomFlip())
model.add(keras.layers.RandomContrast(factor = 0.5))
model.add(res_model)
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(30, activation='relu'))

return model

def setup_pretrained_model(checkpoint_path):

"""
Function to load pretrained model
"""

model = make_model()
model.load_weights(checkpoint_path)

return model
Now we define the optimizer which in our case is Adam and compile the model for training. Notice that we are using Root Mean Squared Error as the metric here.
def make_model():

"""
Function to load model, define optimizer and compile the model
"""

model = setup_model()
# Define Optimizer
optimizer = keras.optimizers.Adam(learning_rate=config.LEARNING_RATE)

# Compile the model
model.compile(
optimizer=optimizer,
loss='mean_squared_error',
metrics=tf.keras.metrics.RootMeanSquaredError()
)

return model
Thus, we now load our datasets, obtain the callbacks and set the model training function.
def run_model(checkpoint_path):

"""
Function to fit the model on the dataset and report the training time
"""

X_train, X_test, X_valid, y_train, y_valid, y_test = get_correct_dimensions()

model = make_model()

callbacks = get_callbacks(checkpoint_path = checkpoint_path)

# Train model and save history

start = time.time()

history = model.fit(
X_train,
y_train,
validation_data=(X_valid, y_valid),
batch_size=config.BATCH_SIZE,
epochs=config.NUM_EPOCHS,
callbacks=callbacks
)

end = time.time()

print(f"Total Time taken for Model Training: {end - start} seconds.")

return history

Run Model

We initialize wandb to record our run and start training our model.
run = wandb.init(project='Post Training Quantization')

history = run_model(config.BASELINE_CHECKPOINT_PATH)

wandb.finish()

Run set
1

From the above charts, it is clear that with increasing number of epochs the training as well as the validation loss both decrease, thus there is no overfitting.

Model Evaluation and Size

We will create two functions namely evaluate_model() to obtain the metric (root mean squared error) on validation dataset and get_model_size() to obtain the size of the model.
def evaluate_model(checkpoint_path):
"""
Function to calculate loss and root mean squared error on validation dataset
"""

model = setup_pretrained_model(checkpoint_path)

_, X_test, _, _, _, y_test = get_correct_dimensions()

loss, rmse = model.evaluate(X_test, y_test) # returns loss and metrics
print("loss: %.2f" % loss)
print("rmse: %.2f" % rmse)

def get_model_size(path, checkpoint = True):

"""
Function to obtain the size of a model
"""
if checkpoint:
filepath = path + '.data-00000-of-00001'

else:
filepath = path
print(f"{os.path.getsize(filepath)/float(1<<20):,.0f} MB")

# Evaluate Model
evaluate_model(config.BASELINE_CHECKPOINT_PATH)

# Model Size
get_model_size(config.BASELINE_CHECKPOINT_PATH)

Baseline Model Conclusion:

These are the results obtained from the baseline model we trained.
Model Description -
  • ResNet50 with Pretrained Weights along with Data Augmentations
Result -
  • Root Mean Squared Error = 3.05
  • Training Time = 776 seconds
  • Model Size = 228 MB
We will refer back to this conclusion after we quantize the model.

Post-Training Quantization

Post-training quantization includes general techniques to reduce CPU and hardware accelerator latency, processing, power, and model size with little degradation in model accuracy.
These techniques can be performed on an already-trained float TensorFlow model and applied during TensorFlow Lite conversion.
These techniques are enabled as options in the TensorFlow Lite converter.
To implement post-training quantization, in Step-1 we first load our fine tuned model and build it with the input size.
# Load Baseline Model
model = setup_pretrained_model(config.BASELINE_CHECKPOINT_PATH)

# Build Model
model.build((None, 96,96,3))

Convert Model to TensorFlow Lite Format

In Step-2 we convert our fine tuned model to a TensorFlow Lite Model.
This can be easily achieved using the Python TFLiteConverter
# Convert to TensorFlow Lite Model with optimization

converter = tf.lite.TFLiteConverter.from_keras_model(model) # Define Converter
converter.optimizations = [tf.lite.Optimize.DEFAULT] # Activate quantization
tflite_quant_model = converter.convert() # Convert the model
Now our model has been converted into TFLite format and we need to save it. We make a separate directory and write the TFLite model in a file and store it in that directory.
# Path of TFLite Models
dir_path = config.DIRECTORY_PATH + "/models/post_training_quantization/"

tflite_models_dir = pathlib.Path(dir_path)
tflite_models_dir.mkdir(exist_ok=True, parents=True)

tflite_model_quant_file = tflite_models_dir/"resnet50_quant.tflite"

# Write File
tflite_model_quant_file.write_bytes(tflite_quant_model)

# Model Size
get_model_size(config.POST_TRAINING_QUANTIZATION_PATH, checkpoint = False)
Here comes the magic, The new quantized model size is just 25 MB! We were able to reduce the model size by 10 time just by quantization. Now you must be thinking okay, but what about the accuracy of the model? Lets run evaluation on the quantized model to find out.

Run Model

We load our datasets.
# Obtain datasets
X_train, X_test, X_valid, y_train, y_valid, y_test = get_correct_dimensions()
To run a TensorFlow Lite model we need to use the TensorFlow Lite Interpreter and load the quantized model into the interpreter.
# Load Model into Interpreter

interpreter = tf.lite.Interpreter(model_path=str(tflite_model_quant_file))
interpreter.allocate_tensors()

Evaluate One Image

We first test our inference on a single image to check if we are able to get desired outputs.
test_image = X_train[0].astype(np.float32)

input_index = interpreter.get_input_details()[0]["index"]
output_index = interpreter.get_output_details()[0]["index"]

interpreter.set_tensor(input_index, tf.expand_dims(test_image, axis=0))

interpreter.invoke()
predictions = (interpreter.get_tensor(output_index)).flatten()

mean_squared_error(y_test.iloc[0,:].values, predictions, squared = False)

Evaluate on validation dataset

Now that we are sure of our model inference, we run it on the entire validation set to obtain the RMSE Score.
# A helper function to evaluate the TF Lite model using "test" dataset.

def evaluate_quant_model(interpreter):

input_index = interpreter.get_input_details()[0]["index"]
output_index = interpreter.get_output_details()[0]["index"]

# Run predictions on every image in the validation dataset.
final_predictions = []
rmse = 0

for test_image, target in tqdm(zip(X_test, y_test.values), total = len(X_test)):

# Pre-processing: add batch dimension and convert to float32 to match with
# the model's input data format.
test_image = test_image.astype(np.float32)
interpreter.set_tensor(input_index, tf.expand_dims(test_image, axis=0))

# Run inference.
interpreter.invoke()

# Post-processing:
output = interpreter.tensor(output_index)

# Obtain Predictions
predictions = (interpreter.get_tensor(output_index)).flatten()
final_predictions.append(predictions)

# Evaluate Model
rmse += (mean_squared_error(target, predictions, squared = False))/len(X_test)

return rmse

rmse = evaluate_quant_model(interpreter)

print(rmse)
Upon evaluation you will find the Quantized Model RMSE Score to be 2.84, which is even better than our original baseline model with RMSE of 3.05!
By now you must have understood why quantization is special. In most cases quantization not only reduces the model size but improves the accuracy as well.

Post-Training Quantization Model Conclusion:

Here are the results from the Quantized Model.
Model Description -
  • ResNet50 with Pretrained Weights along with Data Augmentations
  • Post-Training Dynamic Range Quantization
Result -
  • Root Mean Squared Error = 2.84 (Considerably better than baseline model)
  • Evaluation Time = 199 seconds
  • Model Size = 25 MB (10 times smaller than baseline model)

TFLite Inference

In the final step, we will perform inference on the original Test Dataset provided in the competition. The steps are similar as the last section.
def tflite_inference(interpreter):
"""
Function to perform inference of a tflite model
"""

# Prepare Test Dataset
_, _, X_test_original = process_data()
X_test_original = X_test_original.reshape((-1, 96, 96, 1))
X_test_original = np.concatenate((X_test_original, X_test_original, X_test_original), axis=-1)

input_index = interpreter.get_input_details()[0]["index"]
output_index = interpreter.get_output_details()[0]["index"]

# Run predictions on every image in the validation dataset.
final_predictions = []

for test_image in tqdm(X_test_original, total = len(X_test_original)):

# Pre-processing: add batch dimension and convert to float32 to match with
# the model's input data format.
test_image = test_image.astype(np.float32)
interpreter.set_tensor(input_index, tf.expand_dims(test_image, axis=0))

# Run inference.
interpreter.invoke()

# Post-processing:
output = interpreter.tensor(output_index)

# Obtain Predictions
predictions = (interpreter.get_tensor(output_index)).flatten()
final_predictions.append(predictions)

return final_predictions

# Load the TFLite model and allocate tensors.
interpreter = tf.lite.Interpreter(model_path=config.POST_TRAINING_QUANTIZATION_PATH)
interpreter.allocate_tensors()

predictions = tflite_inference(interpreter)
After we have obtained the predictions, we create a submission file for the competition, which contains the Row Id along with the location of the keypoint.
def generate_submission(predictions):

"""
Function to create submission file from the model predictions
"""

# Convert list to array
predictions = np.array(predictions)

# Get data
_, _, _, y_train, _, _ = get_correct_dimensions()
cols = list(y_train.columns)

# Load Id LookUp Table
df = pd.read_csv(config.ID_LOOKUP_TABLE_PATH)

# Fill the locations
for i in range(df.shape[0]):
df.Location[i] = predictions[df.ImageId[i]-1][cols.index(df.FeatureName[i])]

# Drop Columns which are not required in submission
df.drop(['ImageId', 'FeatureName'], axis=1, inplace=True)

# Set Row Id as index
df = df.set_index(['RowId'])
# Save submission file
df.to_csv(config.SUBMISSION_FILE_PATH)

generate_submission(predictions)
We will plot a couple of images from the test set to see how well our model performs.
def plot_test_image(df,prediction, index):
"""
Function to plot images with their predictions for test set
"""
image = plt.imshow(df[index])
l = []
for i in range(1,31,2):
l.append(plt.plot(prediction[index][i-1], prediction[index][i], 'ro'))
return image, l

fig = plt.figure(figsize=(20, 20))

for i in range(20):
ax = fig.add_subplot(5, 4, i + 1)
plot_test_image(X_test,predictions, i)

plt.show()


As you can notice, that the model is able to detect most of the keypoints accurately, though there is always scope for improvement!
This was Part-1 of the Quantization with Keras series. In the next part we will continue this example to understand Quantization Aware Training and see how to results compare. We will also discuss about more strategies to improve the model performance as well.
The entire code is available on GitHub in the repository facial-keypoints-detection.
If you still face any difficulties reach out to me on LinkedIn or Twitter, my messages are open :)
Iterate on AI agents and models faster. Try Weights & Biases today.